Token Counting and Context Management
Before reading any explanation, predict what this prints:
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
texts = [
"Hello, world!",
"Hello, world!", # two spaces
"Hello,\nworld!", # newline instead of space
"hello, world!", # lowercase h
]
for t in texts:
print(len(enc.encode(t)), repr(t))
Write your predictions down. Then continue.
# Output:
# 4 'Hello, world!'
# 5 'Hello, world!'
# 4 'Hello,\nworld!'
# 4 'hello, world!'
Two spaces between "Hello," and "world!" costs an extra token. A newline does not. Lowercase h merges differently from uppercase. This is not a curiosity -- it is the kind of detail that causes your production application to silently truncate user messages, overflow context windows, and charge you more than you budgeted.
Tokens are the fundamental unit of LLM economics. Every API call is priced in tokens, every context window is measured in tokens, and every model's capabilities are defined by how many tokens it can reason over simultaneously. This lesson teaches you to count, manage, and budget tokens as a first-class engineering concern.
What You Will Learn
- How BPE tokenisation works and why different models tokenise differently
- Using
tiktokenfor exact token counts before making API calls - Counting tokens for Anthropic, Cohere, and open-source models
- Context window limits and the consequences of exceeding them
- Sliding window strategies for documents that exceed the context limit
- Building a
ContextManagerclass that tracks token budgets and trims history - Cost estimation and session-level spend tracking
- Prompt compression techniques
- Dynamic context selection by relevance scoring
- Multi-turn conversation management: when to summarise vs keep history
Prerequisites
- Familiarity with the OpenAI and Anthropic Python SDKs (Lesson 1)
- Basic understanding of how LLM chat completions work
- Python dataclasses, type hints, and collections
Part 1 -- How Tokenisation Actually Works
Byte-Pair Encoding
Modern LLMs do not process characters or words directly. They operate on tokens -- variable-length byte sequences that represent common character patterns. The tokeniser is trained separately from the model, using a merge-based algorithm called Byte-Pair Encoding (BPE).
The algorithm:
- Start with a vocabulary of individual bytes (256 tokens for all possible byte values).
- Count the most frequent pair of adjacent tokens in the training corpus.
- Merge that pair into a single new token and add it to the vocabulary.
- Repeat until the vocabulary reaches the target size (GPT-4's tokeniser has ~100,000 tokens).
The result: common English words and subwords become single tokens. Rare words, misspellings, and non-English text get split into multiple tokens.
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o")
# Common English words are typically single tokens
single_token_words = ["the", "and", "Python", "function", "model"]
for word in single_token_words:
tokens = enc.encode(word)
decoded = [enc.decode([t]) for t in tokens]
print(f"{word!r:15} -> {len(tokens)} token(s): {decoded}")
# Output:
# 'the' -> 1 token(s): ['the']
# 'and' -> 1 token(s): ['and']
# 'Python' -> 1 token(s): ['Python']
# 'function' -> 1 token(s): ['function']
# 'model' -> 1 token(s): ['model']
# Rare words, typos, and code symbols get split into multiple tokens
split_examples = [
"GPT-4o",
"tokenisation", # British spelling -- less common in US training data
"antidesestablishmentarianism",
"```python",
"def my_function_name():",
]
for text in split_examples:
tokens = enc.encode(text)
decoded = [enc.decode([t]) for t in tokens]
print(f"{text!r:40} -> {len(tokens):2} tokens: {decoded}")
# Sample output:
# 'GPT-4o' -> 4 tokens: ['G', 'PT', '-', '4o']
# 'tokenisation' -> 3 tokens: ['token', 'isation', '']
# 'antidesestablishmentarianism' -> 12 tokens: [...]
# '```python' -> 3 tokens: ['``', '`', 'python']
# 'def my_function_name():' -> 9 tokens: [...]
:::note Why This Matters for Cost
Non-English text, code in unusual syntaxes, and heavily formatted content (JSON, markdown, XML) tokenise less efficiently than plain English prose. A Chinese character typically costs 2-3 tokens where one English word costs 1. A single JSON key-value pair like "user_id": 12345 may cost 7-8 tokens. Model your tokenisation costs against your actual input distribution, not generic benchmarks.
:::
Visualising Token Boundaries
def visualise_tokens(text: str, model: str = "gpt-4o") -> None:
"""Print text with token boundaries marked.
Useful for debugging prompts where you suspect tokenisation
is splitting words in unexpected ways.
"""
enc = tiktoken.encoding_for_model(model)
tokens = enc.encode(text)
# Decode each token individually to see the splits
parts = [enc.decode([t]) for t in tokens]
print(f"Text: {text!r}")
print(f"Token count: {len(tokens)}")
print(f"Tokens: {parts}")
# Visual representation with | separators
visualised = "|".join(parts)
print(f"Visualised: |{visualised}|")
print()
visualise_tokens("def calculate_embedding_similarity(vec_a, vec_b):")
# Text: 'def calculate_embedding_similarity(vec_a, vec_b):'
# Token count: 14
# Tokens: ['def', ' calculate', '_embedding', '_similarity', '(', 'vec', '_a', ',', ' vec', '_b', ')', ':']
# Visualised: |def| calculate|_embedding|_similarity|(|vec|_a|,| vec|_b|):|
Model-Specific Tokenisers
Different model families use different tokenisers. Never assume they are interchangeable:
import tiktoken
# gpt-3.5-turbo and gpt-4 share the cl100k_base tokeniser
# gpt-4o uses o200k_base (larger vocabulary, more efficient for non-English and code)
enc_gpt4 = tiktoken.encoding_for_model("gpt-4")
enc_gpt4o = tiktoken.encoding_for_model("gpt-4o")
test_cases = [
"Hello, I am an AI assistant.", # English prose
"def embed(text: str) -> list[float]:", # Python code
"Bonjour, je suis un assistant IA.", # French
'{"user": "alice", "score": 0.95}', # JSON
]
print(f"{'Text':<45} gpt-4 gpt-4o")
print("-" * 60)
for text in test_cases:
t4 = len(enc_gpt4.encode(text))
t4o = len(enc_gpt4o.encode(text))
diff = t4 - t4o
marker = f"(-{diff})" if diff > 0 else ""
print(f"{text!r:<45} {t4:5} {t4o:5} {marker}")
# Get encoding by name rather than model name.
# Useful when the model name changes but the encoding stays the same.
enc_cl100k = tiktoken.get_encoding("cl100k_base") # gpt-4, gpt-3.5-turbo
enc_o200k = tiktoken.get_encoding("o200k_base") # gpt-4o, gpt-4o-mini
enc_p50k = tiktoken.get_encoding("p50k_base") # text-davinci-003, Codex
# List all known encodings
print(tiktoken.list_encoding_names())
# ['gpt2', 'r50k_base', 'p50k_base', 'p50k_edit', 'cl100k_base', 'o200k_base']
Part 2 -- Counting Tokens Before API Calls
Single String Counting
import tiktoken
from typing import Literal
ModelName = Literal[
"gpt-4o", "gpt-4o-mini", "gpt-4", "gpt-4-turbo", "gpt-3.5-turbo"
]
def count_tokens(text: str, model: ModelName = "gpt-4o") -> int:
"""Count tokens for a plain string.
Use this for individual pieces of text -- a system prompt, a document,
a retrieved chunk. Does NOT account for chat message formatting overhead.
Use count_chat_tokens() for full message lists.
"""
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
print(count_tokens("What is the capital of France?")) # 7
print(count_tokens("What is the capital of France?", "gpt-4")) # same encoding
Chat Message Token Counting
This is where most engineers get tripped up. When you send a list of messages to the chat completions API, OpenAI wraps each message in formatting tokens you do not see in the content. The exact overhead is documented but easy to miss.
import tiktoken
from typing import TypedDict
class Message(TypedDict):
role: str
content: str
def count_chat_tokens(messages: list[Message], model: str = "gpt-4o") -> int:
"""Count tokens for a list of chat messages, including formatting overhead.
Per-message overhead (for gpt-4, gpt-3.5-turbo-0613+, gpt-4o):
- 3 tokens for the message delimiter: <|im_start|>role\n...<|im_end|>\n
- The role string itself (1 token for "user" / "assistant" / "system")
Plus 3 tokens added to the total for the reply primer:
<|im_start|>assistant
Reference: https://platform.openai.com/docs/guides/chat/managing-tokens
"""
try:
enc = tiktoken.encoding_for_model(model)
except KeyError:
# Graceful fallback for unknown or future models
enc = tiktoken.get_encoding("cl100k_base")
tokens_per_message = 3 # delimiter overhead per message
tokens_per_name = 1 # 'name' field in a message adds 1 token
total = 0
for message in messages:
total += tokens_per_message
for key, value in message.items():
total += len(enc.encode(value))
if key == "name":
total += tokens_per_name
total += 3 # Reply primer: every response is primed with <|im_start|>assistant
return total
# Verify against real usage: build a typical conversation and count
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris."},
{"role": "user", "content": "What is its population?"},
]
counted = count_chat_tokens(messages)
print(f"Estimated tokens: {counted}")
# Compare with response.usage.prompt_tokens after an API call to verify accuracy
Why Count Before Calling?
The API's behaviour when you exceed the context window varies:
| Scenario | OpenAI | Anthropic |
|---|---|---|
| Context exceeded | BadRequestError: context_length_exceeded | BadRequestError: prompt_too_long |
| Near limit (no overflow) | Completes normally | Completes normally |
| Silent truncation | Never (fails explicitly) | Never (fails explicitly) |
| Streaming + overflow | Error before any tokens stream | Error before any tokens stream |
Both APIs fail loudly, not silently. But a failed API call still consumes a network round-trip and potentially partial billing. Count first.
import openai
from openai import BadRequestError
# Context window limits as of early 2025 -- verify current limits
MODEL_CONTEXT_LIMITS: dict[str, int] = {
"gpt-4o": 128_000,
"gpt-4o-mini": 128_000,
"gpt-4-turbo": 128_000,
"gpt-4": 8_192,
"gpt-3.5-turbo": 16_385,
"claude-opus-4-6": 200_000,
"claude-3-5-sonnet-20241022": 200_000,
"claude-3-haiku-20240307": 200_000,
}
def safe_chat_complete(
messages: list[Message],
model: str = "gpt-4o",
max_tokens: int = 1_000,
) -> str:
"""Chat completion with pre-flight token budget check.
Raises ValueError before making the API call if the request
would exceed the model's context window.
"""
client = openai.OpenAI()
limit = MODEL_CONTEXT_LIMITS.get(model, 128_000)
input_tokens = count_chat_tokens(messages, model)
# Total required = input + reserved output
if input_tokens + max_tokens > limit:
raise ValueError(
f"Context overflow: {input_tokens:,} input + {max_tokens:,} max_output "
f"= {input_tokens + max_tokens:,} tokens, exceeds {model} limit of {limit:,}. "
f"Available for input: {limit - max_tokens:,} tokens."
)
try:
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
)
return response.choices[0].message.content
except BadRequestError as e:
# This should not happen after a passing pre-flight check,
# but guard against edge cases (e.g., injected content, API changes)
raise RuntimeError(f"API rejected request despite passing pre-flight check: {e}") from e
Part 3 -- Counting Tokens for Anthropic Models
Anthropic does not use tiktoken. Their tokeniser is separate and not released as a standalone library. The correct approach is the Anthropic SDK's dedicated counting endpoint.
import anthropic
client = anthropic.Anthropic()
def count_anthropic_tokens(
messages: list[dict],
system: str = "",
model: str = "claude-opus-4-6",
) -> int:
"""Count tokens using Anthropic's server-side counting API.
This is a real network call. It costs nothing in API fees but adds
latency (~50-200ms). Cache the result for repeated calls with the
same content. Use the fast heuristic for high-frequency pre-flight
checks where exact accuracy is not critical.
"""
response = client.messages.count_tokens(
model=model,
system=system,
messages=messages,
)
return response.input_tokens
# Example usage
system = "You are a helpful assistant specialised in Python engineering."
messages = [
{"role": "user", "content": "Explain asyncio event loops in 3 sentences."}
]
token_count = count_anthropic_tokens(messages, system=system)
print(f"Input tokens: {token_count}")
# When a network call is not acceptable (e.g., sub-millisecond pre-flight
# checks in a hot path), use a character-based heuristic.
# Rule of thumb: 1 token ~= 3.5-4 characters for English prose.
# This is inaccurate for code, markdown, and non-English text.
def estimate_tokens_fast(text: str) -> int:
"""Fast token count heuristic for rough budget checks.
Error margin: ~10-15% for English prose, up to 30% for code/JSON.
Use ONLY for monitoring/logging, never for hard budget enforcement.
"""
# Character-based estimate: ~4 chars per token
char_estimate = len(text) / 4
# Word-based estimate: ~1.3 tokens per word (accounts for punctuation, etc.)
word_estimate = len(text.split()) * 1.3
# Average both estimates for slightly better accuracy across text types
return int((char_estimate + word_estimate) / 2)
Part 4 -- Context Window Limits
The Budget Picture
The context window is not just your input content. It encompasses everything the model processes:
Production systems that feel fast in development often break in real usage because the conversation history grows silently turn by turn. A 128K window sounds enormous until you factor in:
- A 30-turn conversation with medium-length responses: ~15,000 tokens
- A retrieved document set: ~40,000 tokens
- A verbose system prompt: ~2,000 tokens
- The reserved output budget: ~4,000 tokens
That leaves very little headroom for the user's actual question.
from dataclasses import dataclass
@dataclass
class TokenBudget:
"""Tracks token allocation for a single LLM request.
Use this to reason about where your context window is going
before constructing the final messages list.
"""
model: str
system_tokens: int = 0
history_tokens: int = 0
document_tokens: int = 0
user_message_tokens: int = 0
reserved_output_tokens: int = 2_000
@property
def limit(self) -> int:
return MODEL_CONTEXT_LIMITS.get(self.model, 128_000)
@property
def used(self) -> int:
return (
self.system_tokens
+ self.history_tokens
+ self.document_tokens
+ self.user_message_tokens
+ self.reserved_output_tokens
)
@property
def remaining(self) -> int:
return self.limit - self.used
@property
def utilisation_pct(self) -> float:
return (self.used / self.limit) * 100
def fits(self) -> bool:
"""True if the request will fit within the context window."""
return self.remaining >= 0
def __str__(self) -> str:
bar_len = 40
filled = int(bar_len * self.utilisation_pct / 100)
bar = "#" * filled + "-" * (bar_len - filled)
return (
f"[{bar}] {self.utilisation_pct:.1f}%\n"
f" Used: {self.used:>9,} tokens\n"
f" Limit: {self.limit:>9,} tokens\n"
f" Remaining: {self.remaining:>9,} tokens\n"
f" Breakdown: system={self.system_tokens:,} history={self.history_tokens:,} "
f"docs={self.document_tokens:,} user={self.user_message_tokens:,} "
f"output={self.reserved_output_tokens:,}"
)
# Example: plan a request before building it
budget = TokenBudget(
model="gpt-4o",
system_tokens=500,
history_tokens=8_000,
document_tokens=25_000,
user_message_tokens=150,
reserved_output_tokens=4_000,
)
print(budget)
print(f"Fits: {budget.fits()}")
Part 5 -- Sliding Window Strategies for Long Documents
When a document exceeds the context window, three strategies are available. Choose based on the task:
Fixed-Size Chunking with Overlap
import tiktoken
from typing import Iterator
def chunk_text(
text: str,
chunk_size: int = 2_000,
overlap: int = 200,
model: str = "gpt-4o",
) -> Iterator[str]:
"""Split text into overlapping token-based chunks.
Overlap ensures that context is not lost at chunk boundaries.
A sentence that starts near the end of chunk N and continues into
chunk N+1 appears in both. Without overlap, the model processing
chunk N has no idea the sentence was incomplete.
Args:
text: The text to split.
chunk_size: Maximum tokens per chunk.
overlap: Tokens repeated from the end of the previous chunk.
model: Model name for tokeniser selection.
Yields:
Text strings each within the token limit.
"""
enc = tiktoken.encoding_for_model(model)
tokens = enc.encode(text)
if len(tokens) <= chunk_size:
# Document fits in one chunk -- no splitting needed
yield text
return
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
yield enc.decode(chunk_tokens)
if end >= len(tokens):
break
# Slide forward, but overlap with the end of the current chunk
# to preserve boundary context in the next chunk
start = end - overlap
# Usage
with open("large_document.txt") as f:
document = f.read()
chunks = list(chunk_text(document, chunk_size=2_000, overlap=200))
print(f"Split into {len(chunks)} chunks")
enc = tiktoken.encoding_for_model("gpt-4o")
for i, chunk in enumerate(chunks):
n = len(enc.encode(chunk))
print(f" Chunk {i+1}: {n:,} tokens")
Map-Reduce: Processing Entire Documents
Use when you need to process the whole document (summarise, extract, analyse), not just retrieve from it:
import asyncio
import openai
from typing import Sequence
client_async = openai.AsyncOpenAI()
async def map_reduce_document(
document: str,
task_prompt: str,
chunk_size: int = 4_000,
overlap: int = 200,
model: str = "gpt-4o",
max_parallel: int = 5, # Rate limit guard
) -> str:
"""Process a long document with map-reduce: chunk, process, merge.
Map phase: each chunk processed independently and in parallel.
Reduce phase: partial results merged by a final LLM call.
The reduce step is itself bounded by the context window. If you have
hundreds of chunks, you may need a hierarchical reduce (merge results
of N chunks at a time, then merge those summaries).
"""
chunks = list(chunk_text(document, chunk_size=chunk_size, overlap=overlap))
total = len(chunks)
print(f"Processing {total} chunks (max {max_parallel} parallel)...")
semaphore = asyncio.Semaphore(max_parallel) # Prevent rate limit errors
async def process_chunk(chunk: str, index: int) -> str:
async with semaphore:
response = await client_async.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": (
f"You are processing section {index + 1} of {total} "
f"of a larger document. {task_prompt}"
)
},
{"role": "user", "content": chunk},
],
max_tokens=500,
)
return response.choices[0].message.content
# Map: process all chunks concurrently (with rate-limit guard)
partial_results = await asyncio.gather(
*[process_chunk(chunk, i) for i, chunk in enumerate(chunks)]
)
# Build the combined input for the reduce step
combined = "\n\n---\n\n".join(
f"Section {i+1} result:\n{result}"
for i, result in enumerate(partial_results)
)
# Check if the combined results fit in the context window
enc = tiktoken.encoding_for_model(model)
combined_tokens = len(enc.encode(combined))
limit = MODEL_CONTEXT_LIMITS.get(model, 128_000)
if combined_tokens > limit * 0.6: # Leave room for system + output
# Hierarchical reduce needed -- for simplicity, truncate here
# In production: recursively apply map-reduce to the partial results
combined_tokens_target = int(limit * 0.6)
combined = enc.decode(enc.encode(combined)[:combined_tokens_target])
combined += "\n\n[Some sections omitted due to length]"
# Reduce: merge all partial results
merge_response = await client_async.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "Merge these partial results from multiple sections of a document into a single coherent, well-structured response."
},
{
"role": "user",
"content": f"Partial results to merge:\n\n{combined}"
},
],
max_tokens=1_500,
)
return merge_response.choices[0].message.content
Part 6 -- Building a Production ContextManager
This is the class you deploy in real applications. It manages conversation history within a strict token budget, trims when needed, and exposes cost estimates.
import time
from dataclasses import dataclass, field
from collections import deque
from typing import TypedDict
class ChatMessage(TypedDict):
role: str
content: str
@dataclass
class TurnRecord:
"""A single conversation turn with metadata."""
message: ChatMessage
token_count: int
timestamp: float = field(default_factory=time.time)
class ContextManager:
"""Manages conversation history within a token budget.
Design decisions embedded in this class:
- System prompt is immutable -- always preserved, never trimmed.
- The most recent user message is always preserved.
- History is trimmed from the oldest end first.
- When removing a user message, its paired assistant response is also
removed to avoid orphaned turns that confuse the model.
- Target utilisation provides a safety margin below the hard limit.
"""
# Token pricing per million tokens (verify current prices before production use)
PRICING: dict[str, dict[str, float]] = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
}
def __init__(
self,
model: str = "gpt-4o",
system_prompt: str = "",
max_output_tokens: int = 2_000,
target_utilisation: float = 0.85,
):
"""
Args:
model: Model name for token counting and pricing.
system_prompt: The system prompt -- never trimmed.
max_output_tokens: Tokens reserved for the model's response.
target_utilisation: Trim history when it reaches this fraction
of the context limit. Default 0.85 = 15% safety margin.
"""
self.model = model
self.system_prompt = system_prompt
self.max_output_tokens = max_output_tokens
self.target_utilisation = target_utilisation
# Use tiktoken for gpt-* models, fall back to cl100k for others
enc_model = model if model.startswith("gpt") else "gpt-4o"
self._enc = tiktoken.encoding_for_model(enc_model)
self._history: deque[TurnRecord] = deque()
self._total_input_tokens = 0
self._total_output_tokens = 0
self._trim_count = 0 # Diagnostic counter
# Count system prompt tokens once -- it never changes
# +4 for message overhead (delimiter tokens)
self._system_tokens = len(self._enc.encode(system_prompt)) + 4 if system_prompt else 0
@property
def context_limit(self) -> int:
return MODEL_CONTEXT_LIMITS.get(self.model, 128_000)
@property
def _history_tokens(self) -> int:
return sum(r.token_count for r in self._history)
@property
def _budget_for_history(self) -> int:
"""Maximum tokens available for conversation history."""
# Reserve: system prompt + output tokens + 50 tokens overhead buffer
reserved = self._system_tokens + self.max_output_tokens + 50
target = int(self.context_limit * self.target_utilisation)
return max(0, target - reserved)
def _count_message_tokens(self, message: ChatMessage) -> int:
"""Count tokens for one message including formatting overhead."""
# 3 tokens for message delimiters + 1 for the role field
return len(self._enc.encode(message["content"])) + 4
def add_user_message(self, content: str) -> int:
"""Add a user message. Trims old history if necessary to make room.
Returns:
Number of turns trimmed (0 if no trimming was needed).
"""
message: ChatMessage = {"role": "user", "content": content}
token_count = self._count_message_tokens(message)
trimmed = self._trim_to_fit(token_count)
self._history.append(TurnRecord(message=message, token_count=token_count))
return trimmed
def add_assistant_message(self, content: str, output_tokens: int = 0) -> None:
"""Add an assistant response to the history.
Args:
content: The assistant's response text.
output_tokens: Actual output tokens from response.usage.completion_tokens.
Used for cost tracking. Pass 0 if unavailable.
"""
message: ChatMessage = {"role": "assistant", "content": content}
token_count = self._count_message_tokens(message)
self._history.append(TurnRecord(message=message, token_count=token_count))
# Track for session cost accounting
if output_tokens:
self._total_output_tokens += output_tokens
def _trim_to_fit(self, incoming_tokens: int) -> int:
"""Remove oldest turns until history fits within the budget.
Maintains turn coherence: never removes a user message without
also removing its paired assistant response.
Returns:
Number of TurnRecord objects removed.
"""
budget = self._budget_for_history - incoming_tokens
if self._history_tokens <= budget:
return 0 # No trimming needed
removed = 0
while self._history_tokens > budget and self._history:
evicted = self._history.popleft()
removed += 1
# If we removed a user message, also remove the following
# assistant message to avoid a history that starts with an
# assistant turn (confuses some models)
if (
evicted.message["role"] == "user"
and self._history
and self._history[0].message["role"] == "assistant"
):
self._history.popleft()
removed += 1
self._trim_count += removed
return removed
def build_messages(self) -> list[ChatMessage]:
"""Build the messages list for the API call.
The returned list is a new list -- safe to modify without
affecting the manager's internal state.
"""
messages: list[ChatMessage] = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
messages.extend(record.message for record in self._history)
return messages
def estimate_cost(self, actual_output_tokens: int = 0) -> dict[str, float]:
"""Estimate cost for the current context state.
Args:
actual_output_tokens: If you have the real output token count
from a completed API call, pass it here for accuracy.
Otherwise uses max_output_tokens as the estimate.
Returns:
Dict with token counts and USD cost estimates.
"""
pricing = self.PRICING.get(self.model, {"input": 5.0, "output": 15.0})
current_input = self._system_tokens + self._history_tokens
output_est = actual_output_tokens or self.max_output_tokens
return {
"input_tokens": current_input,
"output_tokens_estimate": output_est,
"input_cost_usd": round((current_input / 1_000_000) * pricing["input"], 6),
"output_cost_usd": round((output_est / 1_000_000) * pricing["output"], 6),
"total_cost_usd": round(
(current_input / 1_000_000) * pricing["input"]
+ (output_est / 1_000_000) * pricing["output"],
6,
),
"session_input_tokens": self._total_input_tokens + current_input,
"session_output_tokens": self._total_output_tokens,
"turns_trimmed_total": self._trim_count,
}
def status(self) -> str:
"""Human-readable context window status line."""
total = self._system_tokens + self._history_tokens
pct = (total / self.context_limit) * 100
bar_len = 30
filled = int(bar_len * pct / 100)
bar = "#" * filled + "-" * (bar_len - filled)
return (
f"[{bar}] {pct:.1f}% | "
f"{total:,}/{self.context_limit:,} tokens | "
f"{len(self._history)} messages in history"
)
Using the ContextManager in a Chat Loop
import openai
client = openai.OpenAI()
def run_chat_session() -> None:
"""Multi-turn chat with automatic context management."""
ctx = ContextManager(
model="gpt-4o",
system_prompt=(
"You are a senior Python engineer helping a team build production "
"LLM applications. Be concise -- answer in 2-4 sentences unless "
"the question requires more detail."
),
max_output_tokens=500,
target_utilisation=0.80, # Trim early to avoid close-to-limit panics
)
questions = [
"What is the difference between a context window and model memory?",
"How does RAG solve the context limit problem?",
"Should I summarise history or truncate it for a coding assistant?",
]
for question in questions:
trimmed = ctx.add_user_message(question)
if trimmed:
print(f"[Trimmed {trimmed} old message(s) to make room]")
print(f"Status: {ctx.status()}")
cost = ctx.estimate_cost()
print(f"Estimated cost this call: ${cost['total_cost_usd']:.5f}")
response = client.chat.completions.create(
model=ctx.model,
messages=ctx.build_messages(),
max_tokens=500,
)
answer = response.choices[0].message.content
actual_output = response.usage.completion_tokens
ctx.add_assistant_message(answer, output_tokens=actual_output)
print(f"Q: {question}")
print(f"A: {answer}")
print()
# Session summary
final_cost = ctx.estimate_cost()
print(f"Session input tokens: {final_cost['session_input_tokens']:,}")
print(f"Session output tokens: {final_cost['session_output_tokens']:,}")
Part 7 -- Cost Estimation and Session Tracking
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class APICallRecord:
"""Immutable record of a single LLM API call."""
model: str
input_tokens: int
output_tokens: int
timestamp: datetime = field(default_factory=datetime.now)
purpose: str = "" # e.g. "user_chat", "document_summary", "rerank"
@property
def cost_usd(self) -> float:
pricing = ContextManager.PRICING.get(
self.model, {"input": 5.0, "output": 15.0}
)
return (
(self.input_tokens / 1_000_000) * pricing["input"]
+ (self.output_tokens / 1_000_000) * pricing["output"]
)
class SpendTracker:
"""Session-level cost and token tracker.
Answers the questions: "How much did this user session cost?"
and "Which feature accounts for most of our API spend?"
In production, persist records to a database and aggregate them
at the user/tenant/feature level for billing and cost attribution.
"""
def __init__(self, budget_usd: float | None = None):
"""
Args:
budget_usd: Optional spending cap. Raises BudgetExceededError
if a call would exceed this limit.
"""
self._calls: list[APICallRecord] = []
self._budget_usd = budget_usd
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
purpose: str = "",
) -> APICallRecord:
"""Record an API call. Raises BudgetExceededError if over budget."""
call = APICallRecord(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
purpose=purpose,
)
if self._budget_usd is not None:
projected = self.total_cost_usd + call.cost_usd
if projected > self._budget_usd:
raise BudgetExceededError(
f"Call would bring session spend to ${projected:.4f}, "
f"exceeding budget of ${self._budget_usd:.4f}. "
f"Remaining budget: ${self.remaining_budget_usd:.4f}"
)
self._calls.append(call)
return call
@property
def total_cost_usd(self) -> float:
return sum(c.cost_usd for c in self._calls)
@property
def remaining_budget_usd(self) -> float | None:
if self._budget_usd is None:
return None
return self._budget_usd - self.total_cost_usd
@property
def total_input_tokens(self) -> int:
return sum(c.input_tokens for c in self._calls)
@property
def total_output_tokens(self) -> int:
return sum(c.output_tokens for c in self._calls)
def breakdown_by_purpose(self) -> dict[str, dict]:
"""Cost breakdown grouped by feature/purpose."""
breakdown: dict[str, dict] = {}
for call in self._calls:
key = call.purpose or "unknown"
if key not in breakdown:
breakdown[key] = {
"calls": 0, "input_tokens": 0,
"output_tokens": 0, "cost_usd": 0.0
}
breakdown[key]["calls"] += 1
breakdown[key]["input_tokens"] += call.input_tokens
breakdown[key]["output_tokens"] += call.output_tokens
breakdown[key]["cost_usd"] += call.cost_usd
# Sort by cost descending so the most expensive features appear first
return dict(
sorted(breakdown.items(), key=lambda x: x[1]["cost_usd"], reverse=True)
)
def summary(self) -> str:
lines = [
f"Session: {len(self._calls)} API call(s)",
f" Tokens: {self.total_input_tokens:,} in / {self.total_output_tokens:,} out",
f" Cost: ${self.total_cost_usd:.5f} USD",
]
if self._budget_usd is not None:
lines.append(f" Budget: ${self._budget_usd:.4f} ({self.remaining_budget_usd:.5f} remaining)")
if self._calls:
lines.append("")
lines.append(" By purpose:")
for purpose, data in self.breakdown_by_purpose().items():
lines.append(
f" {purpose:<25} {data['calls']:>3} calls "
f"{data['input_tokens']:>8,} in / {data['output_tokens']:>7,} out "
f"${data['cost_usd']:.5f}"
)
return "\n".join(lines)
class BudgetExceededError(RuntimeError):
"""Raised when an API call would exceed the configured spending budget."""
pass
Part 8 -- Prompt Compression
When you need to reduce token usage without changing the model or truncating critical content, compression is the lever.
import re
def compress_whitespace(text: str) -> str:
"""Remove redundant whitespace. Lossless for semantics.
Typical savings: 2-8% for heavily formatted documents.
"""
text = re.sub(r"[ \t]+", " ", text) # Collapse spaces/tabs
text = re.sub(r" +\n", "\n", text) # No trailing spaces on lines
text = re.sub(r"\n{3,}", "\n\n", text) # Max 2 consecutive blank lines
return text.strip()
def strip_markdown(text: str) -> str:
"""Remove markdown syntax, keeping only text content.
Typical savings: 5-20% on heavily formatted docs.
Warning: loses structural information (headers, emphasis, links).
Use only when the model does not need the formatting.
"""
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE) # Headers
text = re.sub(r"\*{1,3}(.+?)\*{1,3}", r"\1", text) # Bold/italic
text = re.sub(r"_{1,3}(.+?)_{1,3}", r"\1", text) # Underline
text = re.sub(r"`(.+?)`", r"\1", text) # Inline code
text = re.sub(r"^---+\s*$", "", text, flags=re.MULTILINE) # HR lines
text = re.sub(r"\[(.+?)\]\(.+?\)", r"\1", text) # Links (keep text)
return compress_whitespace(text)
def compress_to_budget(
text: str,
target_tokens: int,
model: str = "gpt-4o",
) -> tuple[str, str]:
"""Progressively compress text to fit within a token budget.
Applies compression in order of increasing aggressiveness:
1. Whitespace normalisation (lossless)
2. Markdown stripping (structural loss, content preserved)
3. Token-level truncation with sentinel (lossy)
Returns:
Tuple of (compressed_text, method_used)
"""
enc = tiktoken.encoding_for_model(model)
# Stage 1: lossless
compressed = compress_whitespace(text)
if len(enc.encode(compressed)) <= target_tokens:
return compressed, "whitespace_normalised"
# Stage 2: structural loss
compressed = strip_markdown(text)
if len(enc.encode(compressed)) <= target_tokens:
return compressed, "markdown_stripped"
# Stage 3: truncation (last resort)
sentinel = "\n... [content truncated to fit context window]"
sentinel_tokens = len(enc.encode(sentinel))
tokens = enc.encode(compressed)
kept = tokens[:target_tokens - sentinel_tokens]
return enc.decode(kept) + sentinel, "truncated"
# Measure compression on real content types
def benchmark_compression(text: str, label: str, model: str = "gpt-4o") -> None:
enc = tiktoken.encoding_for_model(model)
original_n = len(enc.encode(text))
ws_compressed = compress_whitespace(text)
ws_n = len(enc.encode(ws_compressed))
md_compressed = strip_markdown(text)
md_n = len(enc.encode(md_compressed))
print(f"{label}:")
print(f" Original: {original_n:5} tokens")
print(f" After whitespace: {ws_n:5} tokens ({100*(original_n-ws_n)/original_n:.1f}% saved)")
print(f" After markdown strip:{md_n:5} tokens ({100*(original_n-md_n)/original_n:.1f}% saved)")
print()
Part 9 -- Dynamic Context Selection
Rather than including everything and hoping it fits, rank content by relevance and include only what matters for the current query.
import math
from collections import Counter
def tf_idf_score(query: str, document: str) -> float:
"""Relevance score using term frequency heuristic.
For production use, replace this with embedding-based cosine similarity
(covered in the Vector Search lesson). This lightweight version is useful
when you cannot afford additional embedding API calls -- for example,
when doing context trimming inside a tight latency budget.
"""
if not document.strip():
return 0.0
query_terms = set(query.lower().split())
doc_terms = document.lower().split()
term_counts = Counter(doc_terms)
scores = []
for term in query_terms:
tf = term_counts.get(term, 0) / len(doc_terms)
scores.append(math.log(1 + tf)) # Log normalisation avoids long-doc bias
return sum(scores) / len(query_terms) if query_terms else 0.0
def select_relevant_chunks(
query: str,
chunks: list[str],
token_budget: int,
model: str = "gpt-4o",
min_score: float = 0.001,
) -> list[str]:
"""Select the most relevant chunks that fit within a token budget.
Algorithm:
1. Score each chunk by relevance to the query.
2. Sort by score descending.
3. Greedily include chunks until the budget is exhausted.
Skip chunks that are individually too large (do not abort).
Args:
query: The user's question.
chunks: Candidate text chunks.
token_budget: Max tokens to use for context.
model: Model for token counting.
min_score: Filter out chunks with score below this threshold.
Returns:
Chunks sorted by relevance, total tokens within budget.
"""
enc = tiktoken.encoding_for_model(model)
# Score and filter
scored = [
(chunk, tf_idf_score(query, chunk))
for chunk in chunks
if tf_idf_score(query, chunk) >= min_score
]
scored.sort(key=lambda x: x[1], reverse=True)
# Greedy packing: include highest-scoring chunks first
selected: list[str] = []
tokens_used = 0
for chunk, score in scored:
chunk_tokens = len(enc.encode(chunk))
if tokens_used + chunk_tokens <= token_budget:
selected.append(chunk)
tokens_used += chunk_tokens
return selected
def build_rag_context(
query: str,
documents: list[str],
context_budget: int = 6_000,
chunk_size: int = 500,
model: str = "gpt-4o",
) -> tuple[str, int]:
"""Chunk documents, select relevant pieces, build a context string.
Returns:
Tuple of (context_string, total_tokens_used)
"""
enc = tiktoken.encoding_for_model(model)
# Chunk all documents
all_chunks: list[str] = []
for doc in documents:
all_chunks.extend(chunk_text(doc, chunk_size=chunk_size, overlap=50))
# Select relevant chunks
relevant = select_relevant_chunks(
query=query,
chunks=all_chunks,
token_budget=context_budget,
model=model,
)
if not relevant:
return "", 0
# Format with minimal separators (separators cost tokens too)
context = "\n---\n".join(relevant)
return context, len(enc.encode(context))
Part 10 -- Multi-Turn Conversation: Summarise vs Trim
class SummarisedContextManager(ContextManager):
"""ContextManager variant that summarises old turns instead of discarding them.
When history exceeds the trim threshold, the oldest half of turns
is summarised with a cheap fast model and stored as a compressed
summary. The summary is prepended to the system prompt on every
subsequent call, preserving semantic context at a fraction of the
original token cost.
Trade-off: each summarisation costs one additional API call
(latency + ~$0.0001 with gpt-4o-mini). Prefer this over raw
trimming when the application needs to remember what was discussed
earlier in long sessions (e.g., research assistants, tutors).
"""
def __init__(self, *args, summarise_model: str = "gpt-4o-mini", **kwargs):
super().__init__(*args, **kwargs)
self._summary: str = ""
self._summarise_model = summarise_model
self._summarisations: int = 0
def _summarise_oldest_half(self) -> None:
"""Summarise the oldest half of conversation history."""
if len(self._history) < 4:
# Too few turns to bother summarising -- just trim
self._trim_to_fit(0)
return
half = len(self._history) // 2
to_summarise = list(self._history)[:half]
# Build plain-text representation of the turns to summarise
turns_text = "\n".join(
f"{r.message['role'].upper()}: {r.message['content']}"
for r in to_summarise
)
# Use a cheap model for summarisation -- no need for gpt-4o here
import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model=self._summarise_model,
messages=[
{
"role": "system",
"content": (
"Summarise this conversation fragment in 3-5 sentences. "
"Preserve key decisions, facts established, and the user's goals. "
"Omit pleasantries and repeated clarifications."
)
},
{"role": "user", "content": turns_text},
],
max_tokens=200,
)
new_summary = response.choices[0].message.content
# Append to any existing summary (older context first)
if self._summary:
self._summary = f"{self._summary}\n\nLater: {new_summary}"
else:
self._summary = new_summary
# Remove the summarised turns from live history
for _ in range(half):
if self._history:
self._history.popleft()
self._summarisations += 1
def _trim_to_fit(self, incoming_tokens: int) -> int:
"""Override: trigger summarisation instead of discarding turns."""
budget = self._budget_for_history - incoming_tokens
if self._history_tokens <= budget:
return 0
before = len(self._history)
self._summarise_oldest_half()
after = len(self._history)
removed = before - after
self._trim_count += removed
return removed
def build_messages(self) -> list[ChatMessage]:
"""Build messages with the running summary embedded in the system prompt."""
messages: list[ChatMessage] = []
if self.system_prompt or self._summary:
parts = []
if self.system_prompt:
parts.append(self.system_prompt)
if self._summary:
parts.append(f"## Earlier Conversation Summary\n{self._summary}")
messages.append({"role": "system", "content": "\n\n".join(parts)})
messages.extend(record.message for record in self._history)
return messages
Key Takeaways
- Tokens are the unit of LLM economics: every design decision in LLM applications trades tokens for quality, latency, or cost. Count before you call.
- Tokenisers are model-specific: use
tiktokenfor OpenAI models, the Anthropic SDK'scount_tokensfor Claude. Never cross-apply. - Chat message overhead is real: each message adds 3-4 formatting tokens beyond the content. Account for this in any pre-flight check.
- The context window must contain your output reservation: setting
max_tokens=4000on an 8K model leaves only 4K for input. - Sliding window + map-reduce handles documents larger than the context window. Fixed-size chunking with overlap prevents losing information at chunk boundaries.
- The
ContextManagerpattern is the production answer to growing conversation history. Choose trim (cheap, lossy) vs summarise (one extra API call, semantically preserving) at design time. - Dynamic context selection -- scoring and ranking chunks before including them -- is more token-efficient than concatenating everything. This is the conceptual foundation of RAG.
- Session-level cost tracking is a first-class feature in production LLM systems, not an afterthought. Know which feature costs what before you are surprised by an invoice.
Practice Problems
Problem 1: Token Budget Analyser
Write a function analyse_token_budget(messages, model, max_output_tokens) that returns a formatted table showing:
- Each message: role, content preview (first 50 chars), token count
- Running cumulative token count after each message
- Total including formatting overhead and the output reservation
- A clear warning if the total would exceed the model's context limit
Problem 2: Smart Priority Trimmer
The ContextManager._trim_to_fit method removes turn pairs from the oldest end. Implement a smarter version PriorityContextManager that:
- Tags each added message with a priority (1=low, 2=normal, 3=high)
- When trimming is needed, removes lowest-priority turns first regardless of age
- Has a
pin_message(turn_index)method that prevents a specific turn from ever being trimmed - Still ensures no orphaned turns (user without assistant or vice versa)
Problem 3: Compression Benchmark
Choose 5 text types (news article, Python source file, JSON payload, academic abstract, chat transcript). For each:
- Count raw tokens with tiktoken
- Apply
compress_whitespaceand measure token reduction - Apply
strip_markdownand measure token reduction
Which type benefits most from compression? Which benefits least? Write a 3-sentence explanation based on your understanding of BPE tokenisation and why certain content patterns produce more tokens.
Problem 4: Cost-Aware Streaming Pipeline
Build a CostAwarePipeline that wraps SpendTracker and integrates with the streaming API. It should:
- Accept a
session_budget_usdlimit at construction - Before each streaming call, estimate the cost and raise
BudgetExceededErrorif it would exceed the limit - After each streaming call completes, record the actual token usage from
response.usage - Expose a
remaining_budget_usdproperty - Log a warning (not error) when 80% of the budget is consumed
Problem 5: Multi-Document Context Packer
You have 50 short documents (each 200-800 tokens). A user asks a question and you need to pack as many relevant documents as possible into an 8,000-token context window, leaving 2,000 tokens for the answer.
Implement pack_context(query, documents, context_budget) that:
- Scores each document for relevance to the query
- Uses greedy bin-packing to maximise document count
- Formats selected documents with token-efficient separators
- Returns (packed_context_string, list_of_included_indices)
Include a configurable min_relevance_score parameter. Write a brief analysis: for this task, is it better to include 8 highly relevant documents or 20 moderately relevant ones, and why?
